package com.atguigu.realtime.canalclients;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.common.constants.PrefixConstant;
import com.atguigu.common.constants.TopicConstant;
import com.atguigu.common.utils.JedisUtil;
import com.atguigu.common.utils.KafkaClientUtil;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Random;

/**

        采集 Order_info 和order_detail 到kafka：  insert
        采集 user_info 到 redis  : insert | update
 */
public class SaleDetailClient {

    //发送到redis
    private static Jedis jedis = JedisUtil.getJedisClient();

    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {

        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop103", 11111), "example", null, null);

        canalConnector.connect();

        canalConnector.subscribe("220212.*");

        // 24h不间断，到canal server端拉取数据
        while (true) {

            //当前拉取到的一批数据
            Message message = canalConnector.get(100);

            //判断当前是否拉取到了数据，拉取到了，就打印，如果没有，歇会继续取拉
            if (message.getId() == -1) {

                System.out.println("当前没有最新数据，歇5s....");

                Thread.sleep(5000);

                //跳过本次循环，开始下次循环
                continue;

            }

            List<CanalEntry.Entry> entries = message.getEntries();

            for (CanalEntry.Entry entry : entries) {

                //获取表名
                String tableName = entry.getHeader().getTableName();

                if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {

                    //获取数据
                    ByteString storeValue = entry.getStoreValue();

                    //解析数据
                    parseData(tableName, storeValue);
                }


            }

        }

        //④解析订阅到的数据


    }

    private static void parseData(String table, ByteString storeValue) throws InvalidProtocolBufferException {

        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

        //按照不同的表进行不同的处理
        if ("order_info".equals(table) && rowChange.getEventType().equals(CanalEntry.EventType.INSERT)) {

            sendData(rowChange,TopicConstant.ORDER_INFO,true);

        }else if("order_detail".equals(table) && rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){

            sendData(rowChange,TopicConstant.ORDER_DETAIL,true);
        }else if("user_info".equals(table) &&
                    (
                            rowChange.getEventType().equals(CanalEntry.EventType.INSERT)
                                    ||
                                    rowChange.getEventType().equals(CanalEntry.EventType.UPDATE)
                    )
                 ){

            sendData(rowChange,null,false);



        }

    }

    private static void sendData(CanalEntry.RowChange rowChange, String topic,boolean ifSaveToKafka) {

        //获取这条sql，插入的所有行
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();

        for (CanalEntry.RowData rowData : rowDatasList) {

            //遍历一行
            JSONObject jsonObject = new JSONObject();

            //这一行变化后的列
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();

            for (CanalEntry.Column column : afterColumnsList) {
                jsonObject.put(column.getName(), column.getValue());
            }

            if(ifSaveToKafka){
                int delaySeconds = new Random().nextInt(5);

                //模拟延迟
                /*try {
                    Thread.sleep(delaySeconds * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }*/
                //写入Kafka
                KafkaClientUtil.sendDataToKafka(topic, jsonObject.toJSONString());
            }else{
                /*
                        写入redis
                        设计用户在redis中存储的K-V
                                K： 字符串。体现唯一性
                                V：   单值：  string,hash
                                     集合：   set，list，zset

                          参考粒度：
                                        粒度是一个用户一个K-V，单值类型
                                        粒度所有用户一个K-V，  集合类型
                          参考使用场景：
                                        目前的使用场景是 使用 user_id 关联 用户的信息。
                                                假设使用的 所有用户 一个 K-V ，应该在value的集合类型中遍历，找到对应的用户
                                                        时间复杂度：  O(n)

                                        高效：  使用单值类型，粒度是一个用户一个K-V 。 在K体现用户的id。
                                                        时间复杂度:  O(1)

                             ----------------
                             最终选择：   使用string类型，k体现用户的id。一个user一个K-V
                 */
                //获取user_id
                String id = jsonObject.getString("id");
                jedis.set(PrefixConstant.user_info_redis_preffix + id,jsonObject.toJSONString());

            }

        }
    }
}
